{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# fastgpu\n",
    "\n",
    "> API details"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# default_exp core"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "import os\n",
    "os.environ['CUDA_DEVICE_ORDER']='PCI_BUS_ID'\n",
    "from fastcore.all import *\n",
    "from pynvml import *"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#hide\n",
    "from nbdev.showdoc import *"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Overview"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here's what fastgpu does:\n",
    "\n",
    "1. poll `to_run`\n",
    "1. find first file\n",
    "1. check there's an available worker id\n",
    "1. move it to `running`\n",
    "1. handle the script\n",
    "   1. create lock file\n",
    "   1. redirect stdout/err to `out`\n",
    "   1. run it\n",
    "   1. when done, move it to `complete` or `failed`\n",
    "   1. unlock"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For demonstrating how to use `fastgpu`, we first create a directory to store our scripts and outputs:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "path = Path('data')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "def setup_dirs(path):\n",
    "    \"Create and return the following subdirs of `path`: to_run running complete fail out\"\n",
    "    path.mkdir(exist_ok=True)\n",
    "    dirs = L(path/o for o in 'to_run running complete fail out'.split())    \n",
    "    for o in dirs: o.mkdir(exist_ok=True)\n",
    "    return dirs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "These are all the subdirectories that are created for us. Your scripts go in `to_run`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "path_run,path_running,path_complete,path_fail,path_out = setup_dirs(path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's create a scripts directory with a couple of \"scripts\" (actually symlinks for this demo) in it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def _setup_test_env():\n",
    "    shutil.rmtree('data')\n",
    "    res = setup_dirs(path)\n",
    "    os.symlink(Path('test_scripts/script_succ.sh').absolute(), path_run/'script_succ.sh')\n",
    "    os.symlink(Path('test_scripts/script_fail.sh').absolute(), path_run/'script_fail.sh')\n",
    "    (path_run/'test_dir').mkdir(exist_ok=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "_setup_test_env()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Helper functions for scripts"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "These functions are used to find and run scripts, and move scripts to the appropriate subdirectory at the appropriate time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "def find_next_script(p):\n",
    "    \"Get the first script from `p` (in sorted order)\"\n",
    "    files = p.ls().sorted().filter(Self.is_file())\n",
    "    if files: return files[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_eq(find_next_script(path_run).name, 'script_fail.sh')\n",
    "assert not find_next_script(path_complete)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "def safe_rename(file, dest):\n",
    "    \"Move `file` to `dest`, prefixing a random uuid if there's a name conflict\"\n",
    "    to_name = dest/file.name\n",
    "    if to_name.exists():\n",
    "        u = uuid4()\n",
    "        to_name = dest/f'{name}-{u}'\n",
    "        warnings.warn(f'Using unique name {to_name}')\n",
    "    file.replace(to_name)\n",
    "    return to_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ResourcePoolBase -"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "class ResourcePoolBase():\n",
    "    def __init__(self, path):\n",
    "        self.path = Path(path)\n",
    "        setup_dirs(self.path)\n",
    "    \n",
    "    def _lockpath(self,ident): return self.path/f'{ident}.lock'\n",
    "    def _is_locked(self,ident): return self._lockpath(ident).exists()\n",
    "    def lock(self,ident, txt='locked'): self._lockpath(ident).write_text(str(txt))\n",
    "    def unlock(self,ident): return self._lockpath(ident).unlink() if self._is_locked(ident) else None\n",
    "    def is_available(self,ident): return not self._is_locked(ident)\n",
    "    def all_ids(self): raise NotImplementedError\n",
    "    def find_next(self): return first(o for o in self.all_ids() if self.is_available(o))\n",
    "    def lock_next(self):\n",
    "        ident = self.find_next()\n",
    "        if ident is None: return\n",
    "        self.lock(ident)\n",
    "        return ident\n",
    "\n",
    "    def _launch(self, script, ident, env):\n",
    "        with (self.path/'out'/f'{script.name}.stderr').open(\"w\") as stderr:\n",
    "            with (self.path/'out'/f'{script.name}.stdout').open(\"w\") as stdout:\n",
    "                process = subprocess.Popen(str(script), env=env, stdout=stdout, stderr=stderr)\n",
    "                self.lock(ident, process.pid)\n",
    "                return process.wait()\n",
    "\n",
    "    def _run(self, script, ident):\n",
    "        failed = False\n",
    "        env=copy(os.environ)\n",
    "        env['FASTGPU_ID'] = str(ident)\n",
    "        try: res = self._launch(script, ident, env=env)\n",
    "        except Exception as e: failed = str(e)\n",
    "        (self.path/'out'/f'{script.name}.exitcode').write_text(failed if failed else str(res))\n",
    "        dest = self.path/'fail' if failed or res else self.path/'complete'\n",
    "        finish_name = safe_rename(script, dest)\n",
    "        self.unlock(ident)\n",
    "\n",
    "    def run(self, *args, **kwargs):\n",
    "        thread = Thread(target=self._run, args=args, kwargs=kwargs)\n",
    "        thread.start()\n",
    "\n",
    "    def poll_scripts(self, poll_interval=0.1, exit_when_empty=True):\n",
    "        while True:\n",
    "            sleep(poll_interval)\n",
    "            script = find_next_script(self.path/'to_run')\n",
    "            if script is None:\n",
    "                if exit_when_empty: break\n",
    "                else: continue\n",
    "            ident = self.lock_next()\n",
    "            if ident is None: continue\n",
    "            run_name = safe_rename(script, self.path/'running')\n",
    "            self.run(run_name, ident)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "add_docs(ResourcePoolBase, \"Base class for locked access to list of idents\",\n",
    "         unlock=\"Remove lockfile for `ident`\",\n",
    "         lock=\"Create lockfile for `ident`\",\n",
    "         is_available=\"Is `ident` available\",\n",
    "         all_ids=\"All idents (abstract method)\",\n",
    "         find_next=\"Finds next available resource, or None\",\n",
    "         lock_next=\"Locks an available resource and returns its ident, or None\",\n",
    "         run=\"Run `script` using resource `ident`\",\n",
    "         poll_scripts=\"Poll `to_run` for scripts and run in parallel on available resources\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This abstract class locks and unlocks resources using lockfiles. Override `all_ids` to make the list of resources available. See `FixedWorkerPool` for a simple example and details on each method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "class FixedWorkerPool(ResourcePoolBase):\n",
    "    \"Vends locked access to fixed list of idents\"\n",
    "    def __init__(self, worker_ids, path):\n",
    "        super().__init__(path)\n",
    "        self.worker_ids = worker_ids\n",
    "    \n",
    "    def all_ids(self):\n",
    "        \"All available idents\"\n",
    "        return self.worker_ids"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The simplest possible `ResourcePoolBase` subclass - the resources are just a list of ids. For instance:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "_setup_test_env()\n",
    "wp = FixedWorkerPool(L.range(4), path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolBase.unlock\" class=\"doc_header\"><code>ResourcePoolBase.unlock</code><a href=\"__main__.py#L10\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolBase.unlock</code>(**`ident`**)\n",
       "\n",
       "Remove lockfile for `ident`"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(FixedWorkerPool.unlock)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If there are no locks, this does nothing:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "wp.unlock(0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolBase.find_next\" class=\"doc_header\"><code>ResourcePoolBase.find_next</code><a href=\"__main__.py#L13\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolBase.find_next</code>()\n",
       "\n",
       "Finds next available resource, or None"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(FixedWorkerPool.find_next)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Initially all resources are available (unlocked), so the first from the provided list will be returned:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_eq(wp.find_next(), 0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolBase.lock\" class=\"doc_header\"><code>ResourcePoolBase.lock</code><a href=\"__main__.py#L9\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolBase.lock</code>(**`ident`**, **`txt`**=*`'locked'`*)\n",
       "\n",
       "Create lockfile for `ident`"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(FixedWorkerPool.lock)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After locking the first resource, it is no longer returned next:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "wp.lock(0)\n",
    "test_eq(wp.find_next(), 1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolBase.lock_next\" class=\"doc_header\"><code>ResourcePoolBase.lock_next</code><a href=\"__main__.py#L14\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolBase.lock_next</code>()\n",
       "\n",
       "Locks an available resource and returns its ident, or None"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(FixedWorkerPool.lock_next)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is the normal way to access a resource - it simply combines `find_next` and `lock`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "wp.lock_next()\n",
    "test_eq(wp.find_next(), 2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolBase.run\" class=\"doc_header\"><code>ResourcePoolBase.run</code><a href=\"__main__.py#L39\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolBase.run</code>(**\\*`args`**, **\\*\\*`kwargs`**)\n",
       "\n",
       "Run `script` using resource `ident`"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(FixedWorkerPool.run)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "_setup_test_env()\n",
    "wp = FixedWorkerPool(L.range(4), path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "_setup_test_env()\n",
    "f = find_next_script(path_run)\n",
    "wp._run(f, 0)\n",
    "\n",
    "test_eq(find_next_script(path_run), path_run/'script_succ.sh')\n",
    "test_eq((path_out/'script_fail.sh.exitcode').read_text(), '1')\n",
    "assert (path_fail/'script_fail.sh').exists()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolBase.poll_scripts\" class=\"doc_header\"><code>ResourcePoolBase.poll_scripts</code><a href=\"__main__.py#L43\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolBase.poll_scripts</code>(**`poll_interval`**=*`0.1`*, **`exit_when_empty`**=*`True`*)\n",
       "\n",
       "Poll `to_run` for scripts and run in parallel on available resources"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(FixedWorkerPool.poll_scripts)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "_setup_test_env()\n",
    "wp.poll_scripts()\n",
    "\n",
    "assert not find_next_script(path_run), find_next_script(path_run)\n",
    "test_eq((path_out/'script_fail.sh.exitcode').read_text(), '1')\n",
    "test_eq((path_out/'script_succ.sh.exitcode').read_text(), '0')\n",
    "assert not (path_run/'script_fail.sh').exists()\n",
    "assert (path_fail/'script_fail.sh').exists()\n",
    "assert (path_complete/'script_succ.sh').exists()\n",
    "test_eq((path_out/'script_succ.sh.stdout').read_text(), '0\\n')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## GPU"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#export\n",
    "class ResourcePoolGPU(ResourcePoolBase):\n",
    "    \"Vends locked access to NVIDIA GPUs\"\n",
    "    def __init__(self, path):\n",
    "        super().__init__(path)\n",
    "        nvmlInit()\n",
    "        # NVML doesn't respect CUDA_VISIBLE_DEVICES, so we need to query manually\n",
    "        envvar = os.environ.get(\"CUDA_VISIBLE_DEVICES\", '')\n",
    "        self.devs = (L(envvar.split(',')).map(int) if envvar!=''\n",
    "                     else L.range(nvmlDeviceGetCount()))\n",
    "        self.ids = L.range(self.devs)\n",
    "\n",
    "    def _launch(self, script, ident, env):\n",
    "        env['CUDA_VISIBLE_DEVICES'] = str(self.devs[ident])\n",
    "        return super()._launch(script, ident, env)\n",
    "#         with (self.path/'out'/f'{script.name}.stderr').open(\"w\") as stderr:\n",
    "#             with (self.path/'out'/f'{script.name}.stdout').open(\"w\") as stdout:\n",
    "#                 process = subprocess.Popen(str(script), env=env, stdout=stdout, stderr=stderr)\n",
    "#                 self.lock(ident, process.pid)\n",
    "#                 return process.wait()\n",
    "#         with (self.path/'out'/f'{script.name}.stderr').open(\"w\") as stderr:\n",
    "#             with (self.path/'out'/f'{script.name}.stdout').open(\"w\") as stdout:\n",
    "#                 return subprocess.call(str(script), env=env, stdout=stdout, stderr=stderr)\n",
    "\n",
    "    def is_available(self,ident):\n",
    "        \"If a GPU's used_memory is less than 1G and is running no procs then it will be regarded as available\"\n",
    "        if not super().is_available(ident): return False\n",
    "        device = nvmlDeviceGetHandleByIndex(self.devs[ident]) \n",
    "        if nvmlDeviceGetComputeRunningProcesses(device): return False\n",
    "        return nvmlDeviceGetMemoryInfo(device).used <= 1e9\n",
    "\n",
    "    def all_ids(self):\n",
    "        \"All GPUs\"\n",
    "        return self.ids"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# These only work if you have an NVIDIA GPU installed\n",
    "# wp = ResourcePoolGPU('data')\n",
    "# wp.find_next()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is a resource pool that uses [pynvml](https://pypi.org/project/pynvml/) to find GPUs that aren't being used (based on whether they have memory allocated). It is implemented by overriding two methods from `ResourcePoolBase`. Usage is identical to `FixedWorkerPool`, except that you don't need to pass in `worker_ids`, since available GPUs are considered to be the resource pool."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolGPU.is_available\" class=\"doc_header\"><code>ResourcePoolGPU.is_available</code><a href=\"__main__.py#L13\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolGPU.is_available</code>(**`ident`**)\n",
       "\n",
       "If a GPU's used_memory is less than 1G and is running no procs then it will be regarded as available"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(ResourcePoolGPU.is_available)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "<h4 id=\"ResourcePoolGPU.all_ids\" class=\"doc_header\"><code>ResourcePoolGPU.all_ids</code><a href=\"__main__.py#L20\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
       "\n",
       "> <code>ResourcePoolGPU.all_ids</code>()\n",
       "\n",
       "All GPUs"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "show_doc(ResourcePoolGPU.all_ids)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Export -"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Converted 00_core.ipynb.\n",
      "Converted 01_utils.ipynb.\n",
      "Converted index.ipynb.\n"
     ]
    }
   ],
   "source": [
    "#hide\n",
    "from nbdev.export import notebook2script\n",
    "notebook2script()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
